package com.zhang.third.day11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @title: flink cdc sql 实现方式
 * @author: zhang
 * @date: 2022/4/16 09:52
 */
public class Example2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv
                .executeSql("create table clicks (" +
                        "ID int, " +
                        "username string," +
                        "url string," +
                        "primary key(ID) not enforced " +
                        ") WITH (" +
                        "'connector' = 'mysql-cdc'," +
                        "'hostname' = 'hadoop103'," +
                        "'port' = '3306'," +
                        "'username' = 'root'," +
                        "'password' = '000000'," +
                        "'database-name' = 'userbehavior'," +
                        "'table-name' = 'clicks'" +
                        ")");

        tableEnv
                .executeSql("select * from clicks").print();

    }
}
